home *** CD-ROM | disk | FTP | other *** search
/ PC World Komputer 2010 April / PCWorld0410.iso / hity wydania / Ubuntu 9.10 PL / karmelkowy-koliberek-9.10-netbook-remix-PL.iso / casper / filesystem.squashfs / usr / lib / python2.6 / multiprocessing / synchronize.py < prev    next >
Text File  |  2009-11-02  |  9KB  |  306 lines

  1. #
  2. # Module implementing synchronization primitives
  3. #
  4. # multiprocessing/synchronize.py
  5. #
  6. # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
  7. #
  8.  
  9. __all__ = [
  10.     'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
  11.     ]
  12.  
  13. import threading
  14. import os
  15. import sys
  16.  
  17. from time import time as _time, sleep as _sleep
  18.  
  19. import _multiprocessing
  20. from multiprocessing.process import current_process
  21. from multiprocessing.util import Finalize, register_after_fork, debug
  22. from multiprocessing.forking import assert_spawning, Popen
  23.  
  24. # Try to import the mp.synchronize module cleanly, if it fails
  25. # raise ImportError for platforms lacking a working sem_open implementation.
  26. # See issue 3770
  27. try:
  28.     from _multiprocessing import SemLock
  29. except (ImportError):
  30.     raise ImportError("This platform lacks a functioning sem_open" +
  31.                       " implementation, therefore, the required" +
  32.                       " synchronization primitives needed will not" +
  33.                       " function, see issue 3770.")
  34.  
  35. #
  36. # Constants
  37. #
  38.  
  39. RECURSIVE_MUTEX, SEMAPHORE = range(2)
  40. SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX
  41.  
  42. #
  43. # Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock`
  44. #
  45.  
  46. class SemLock(object):
  47.  
  48.     def __init__(self, kind, value, maxvalue):
  49.         sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
  50.         debug('created semlock with handle %s' % sl.handle)
  51.         self._make_methods()
  52.  
  53.         if sys.platform != 'win32':
  54.             def _after_fork(obj):
  55.                 obj._semlock._after_fork()
  56.             register_after_fork(self, _after_fork)
  57.  
  58.     def _make_methods(self):
  59.         self.acquire = self._semlock.acquire
  60.         self.release = self._semlock.release
  61.         self.__enter__ = self._semlock.__enter__
  62.         self.__exit__ = self._semlock.__exit__
  63.  
  64.     def __getstate__(self):
  65.         assert_spawning(self)
  66.         sl = self._semlock
  67.         return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue)
  68.  
  69.     def __setstate__(self, state):
  70.         self._semlock = _multiprocessing.SemLock._rebuild(*state)
  71.         debug('recreated blocker with handle %r' % state[0])
  72.         self._make_methods()
  73.  
  74. #
  75. # Semaphore
  76. #
  77.  
  78. class Semaphore(SemLock):
  79.  
  80.     def __init__(self, value=1):
  81.         SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX)
  82.  
  83.     def get_value(self):
  84.         return self._semlock._get_value()
  85.  
  86.     def __repr__(self):
  87.         try:
  88.             value = self._semlock._get_value()
  89.         except Exception:
  90.             value = 'unknown'
  91.         return '<Semaphore(value=%s)>' % value
  92.  
  93. #
  94. # Bounded semaphore
  95. #
  96.  
  97. class BoundedSemaphore(Semaphore):
  98.  
  99.     def __init__(self, value=1):
  100.         SemLock.__init__(self, SEMAPHORE, value, value)
  101.  
  102.     def __repr__(self):
  103.         try:
  104.             value = self._semlock._get_value()
  105.         except Exception:
  106.             value = 'unknown'
  107.         return '<BoundedSemaphore(value=%s, maxvalue=%s)>' % \
  108.                (value, self._semlock.maxvalue)
  109.  
  110. #
  111. # Non-recursive lock
  112. #
  113.  
  114. class Lock(SemLock):
  115.  
  116.     def __init__(self):
  117.         SemLock.__init__(self, SEMAPHORE, 1, 1)
  118.  
  119.     def __repr__(self):
  120.         try:
  121.             if self._semlock._is_mine():
  122.                 name = current_process().name
  123.                 if threading.current_thread().name != 'MainThread':
  124.                     name += '|' + threading.current_thread().name
  125.             elif self._semlock._get_value() == 1:
  126.                 name = 'None'
  127.             elif self._semlock._count() > 0:
  128.                 name = 'SomeOtherThread'
  129.             else:
  130.                 name = 'SomeOtherProcess'
  131.         except Exception:
  132.             name = 'unknown'
  133.         return '<Lock(owner=%s)>' % name
  134.  
  135. #
  136. # Recursive lock
  137. #
  138.  
  139. class RLock(SemLock):
  140.  
  141.     def __init__(self):
  142.         SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1)
  143.  
  144.     def __repr__(self):
  145.         try:
  146.             if self._semlock._is_mine():
  147.                 name = current_process().name
  148.                 if threading.current_thread().name != 'MainThread':
  149.                     name += '|' + threading.current_thread().name
  150.                 count = self._semlock._count()
  151.             elif self._semlock._get_value() == 1:
  152.                 name, count = 'None', 0
  153.             elif self._semlock._count() > 0:
  154.                 name, count = 'SomeOtherThread', 'nonzero'
  155.             else:
  156.                 name, count = 'SomeOtherProcess', 'nonzero'
  157.         except Exception:
  158.             name, count = 'unknown', 'unknown'
  159.         return '<RLock(%s, %s)>' % (name, count)
  160.  
  161. #
  162. # Condition variable
  163. #
  164.  
  165. class Condition(object):
  166.  
  167.     def __init__(self, lock=None):
  168.         self._lock = lock or RLock()
  169.         self._sleeping_count = Semaphore(0)
  170.         self._woken_count = Semaphore(0)
  171.         self._wait_semaphore = Semaphore(0)
  172.         self._make_methods()
  173.  
  174.     def __getstate__(self):
  175.         assert_spawning(self)
  176.         return (self._lock, self._sleeping_count,
  177.                 self._woken_count, self._wait_semaphore)
  178.  
  179.     def __setstate__(self, state):
  180.         (self._lock, self._sleeping_count,
  181.          self._woken_count, self._wait_semaphore) = state
  182.         self._make_methods()
  183.  
  184.     def _make_methods(self):
  185.         self.acquire = self._lock.acquire
  186.         self.release = self._lock.release
  187.         self.__enter__ = self._lock.__enter__
  188.         self.__exit__ = self._lock.__exit__
  189.  
  190.     def __repr__(self):
  191.         try:
  192.             num_waiters = (self._sleeping_count._semlock._get_value() -
  193.                            self._woken_count._semlock._get_value())
  194.         except Exception:
  195.             num_waiters = 'unkown'
  196.         return '<Condition(%s, %s)>' % (self._lock, num_waiters)
  197.  
  198.     def wait(self, timeout=None):
  199.         assert self._lock._semlock._is_mine(), \
  200.                'must acquire() condition before using wait()'
  201.  
  202.         # indicate that this thread is going to sleep
  203.         self._sleeping_count.release()
  204.  
  205.         # release lock
  206.         count = self._lock._semlock._count()
  207.         for i in xrange(count):
  208.             self._lock.release()
  209.  
  210.         try:
  211.             # wait for notification or timeout
  212.             self._wait_semaphore.acquire(True, timeout)
  213.         finally:
  214.             # indicate that this thread has woken
  215.             self._woken_count.release()
  216.  
  217.             # reacquire lock
  218.             for i in xrange(count):
  219.                 self._lock.acquire()
  220.  
  221.     def notify(self):
  222.         assert self._lock._semlock._is_mine(), 'lock is not owned'
  223.         assert not self._wait_semaphore.acquire(False)
  224.  
  225.         # to take account of timeouts since last notify() we subtract
  226.         # woken_count from sleeping_count and rezero woken_count
  227.         while self._woken_count.acquire(False):
  228.             res = self._sleeping_count.acquire(False)
  229.             assert res
  230.  
  231.         if self._sleeping_count.acquire(False): # try grabbing a sleeper
  232.             self._wait_semaphore.release()      # wake up one sleeper
  233.             self._woken_count.acquire()         # wait for the sleeper to wake
  234.  
  235.             # rezero _wait_semaphore in case a timeout just happened
  236.             self._wait_semaphore.acquire(False)
  237.  
  238.     def notify_all(self):
  239.         assert self._lock._semlock._is_mine(), 'lock is not owned'
  240.         assert not self._wait_semaphore.acquire(False)
  241.  
  242.         # to take account of timeouts since last notify*() we subtract
  243.         # woken_count from sleeping_count and rezero woken_count
  244.         while self._woken_count.acquire(False):
  245.             res = self._sleeping_count.acquire(False)
  246.             assert res
  247.  
  248.         sleepers = 0
  249.         while self._sleeping_count.acquire(False):
  250.             self._wait_semaphore.release()        # wake up one sleeper
  251.             sleepers += 1
  252.  
  253.         if sleepers:
  254.             for i in xrange(sleepers):
  255.                 self._woken_count.acquire()       # wait for a sleeper to wake
  256.  
  257.             # rezero wait_semaphore in case some timeouts just happened
  258.             while self._wait_semaphore.acquire(False):
  259.                 pass
  260.  
  261. #
  262. # Event
  263. #
  264.  
  265. class Event(object):
  266.  
  267.     def __init__(self):
  268.         self._cond = Condition(Lock())
  269.         self._flag = Semaphore(0)
  270.  
  271.     def is_set(self):
  272.         self._cond.acquire()
  273.         try:
  274.             if self._flag.acquire(False):
  275.                 self._flag.release()
  276.                 return True
  277.             return False
  278.         finally:
  279.             self._cond.release()
  280.  
  281.     def set(self):
  282.         self._cond.acquire()
  283.         try:
  284.             self._flag.acquire(False)
  285.             self._flag.release()
  286.             self._cond.notify_all()
  287.         finally:
  288.             self._cond.release()
  289.  
  290.     def clear(self):
  291.         self._cond.acquire()
  292.         try:
  293.             self._flag.acquire(False)
  294.         finally:
  295.             self._cond.release()
  296.  
  297.     def wait(self, timeout=None):
  298.         self._cond.acquire()
  299.         try:
  300.             if self._flag.acquire(False):
  301.                 self._flag.release()
  302.             else:
  303.                 self._cond.wait(timeout)
  304.         finally:
  305.             self._cond.release()
  306.